Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial PoC Substrait consumer #1

Draft
wants to merge 53 commits into
base: main
Choose a base branch
from
Draft

Conversation

davisusanibar
Copy link
Owner

PoC Java Substrait consumer

C++ Process:

arrow::Status execute_substraitv3() {
    std::cout << "Hello Substrait!" << std::endl;
    ARROW_ASSIGN_OR_RAISE(std::string substrait_json, GetSubstraitJSON());
    std::shared_ptr<arrow::Buffer> shared_buffer = arrow::engine::SerializeJsonPlan(substrait_json).ValueOrDie();
    Result<std::shared_ptr<RecordBatchReader>> reader = arrow::engine::ExecuteSerializedPlan(*shared_buffer);
    struct ArrowArrayStream c_stream;
    arrow::ExportRecordBatchReader(reader.ValueOrDie(), &c_stream);


    // recover values
    std::shared_ptr<RecordBatchReader> new_shared_table = ImportRecordBatchReader(&c_stream).ValueOrDie();
    Result<std::shared_ptr<Table>> new_table = arrow::Table::FromRecordBatchReader(new_shared_table.get());
    std::shared_ptr<Table> new_shared_table_final = new_table.ValueOrDie();
    std::cout << "Values recovered: " << new_shared_table_final->num_rows() << " rows and "
              << new_shared_table_final->num_columns() << " columns" << std::endl;;
    // It prints: Values recovered: 12 rows and 5 columns
}

Java side (Java --> JNI --> C++)

Need to review why on Java side why ArrowReader is populated with column names but not with any data value information.

  @Test
  public void testBaseSubstraitRead() throws Exception {
    try (ArrowArrayStream arrowArrayStream = ArrowArrayStream.allocateNew(rootAllocator())) {
      if (!org.apache.arrow.dataset.substrait.JniWrapper.get().executeSerializedPlan(getSubstraitPlan(), arrowArrayStream.memoryAddress())) {
        System.out.println("No hay nada que mostrar!!!");
      }
      try (ArrowReader arrowReader = Data.importArrayStream(rootAllocator(), arrowArrayStream)){
        System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());
        // It prints: Only columns name
        // foo     __fragment_index        __batch_index   __last_in_fragment      __filename
        System.out.println(arrowReader.getVectorSchemaRoot().getSchema());
        // It prints: Schema<foo: Binary, __fragment_index: Int(32, true), __batch_index: Int(32, true), __last_in_fragment: Bool, __filename: Utf8>
      }
    }
  }

@github-actions
Copy link

Thanks for opening a pull request!

If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose

Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project.

Then could you also rename the pull request title in the following format?

GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

or

MINOR: [${COMPONENT}] ${SUMMARY}

In the case of PARQUET issues on JIRA the title also supports:

PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

See also:

@davisusanibar
Copy link
Owner Author

PoC Java Substrait consumer

C++ Process:

arrow::Status execute_substraitv3() {
    std::cout << "Hello Substrait!" << std::endl;
    ARROW_ASSIGN_OR_RAISE(std::string substrait_json, GetSubstraitJSON());
    std::shared_ptr<arrow::Buffer> shared_buffer = arrow::engine::SerializeJsonPlan(substrait_json).ValueOrDie();
    Result<std::shared_ptr<RecordBatchReader>> reader = arrow::engine::ExecuteSerializedPlan(*shared_buffer);
    struct ArrowArrayStream c_stream;
    arrow::ExportRecordBatchReader(reader.ValueOrDie(), &c_stream);


    // recover values
    std::shared_ptr<RecordBatchReader> new_shared_table = ImportRecordBatchReader(&c_stream).ValueOrDie();
    Result<std::shared_ptr<Table>> new_table = arrow::Table::FromRecordBatchReader(new_shared_table.get());
    std::shared_ptr<Table> new_shared_table_final = new_table.ValueOrDie();
    std::cout << "Values recovered: " << new_shared_table_final->num_rows() << " rows and "
              << new_shared_table_final->num_columns() << " columns" << std::endl;;
    // It prints: Values recovered: 12 rows and 5 columns
}

Java side (Java --> JNI --> C++)

Need to review why on Java side why ArrowReader is populated with column names but not with any data value information.

  @Test
  public void testBaseSubstraitRead() throws Exception {
    try (ArrowArrayStream arrowArrayStream = ArrowArrayStream.allocateNew(rootAllocator())) {
      if (!org.apache.arrow.dataset.substrait.JniWrapper.get().executeSerializedPlan(getSubstraitPlan(), arrowArrayStream.memoryAddress())) {
        System.out.println("No hay nada que mostrar!!!");
      }
      try (ArrowReader arrowReader = Data.importArrayStream(rootAllocator(), arrowArrayStream)){
        System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());
        // It prints: Only columns name
        // foo     __fragment_index        __batch_index   __last_in_fragment      __filename
        System.out.println(arrowReader.getVectorSchemaRoot().getSchema());
        // It prints: Schema<foo: Binary, __fragment_index: Int(32, true), __batch_index: Int(32, true), __last_in_fragment: Bool, __filename: Utf8>
      }
    }
  }

Problem was that client was not calling arrowReader.loadNextBatch() to start reading data.

@@ -45,6 +45,7 @@ export ARROW_ORC
: ${ARROW_PLASMA:=ON}
export ARROW_PLASMA
: ${ARROW_S3:=ON}
: ${ARROW_SUBSTRAIT:=ON}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SC2223: This default assignment may cause DoS due to globbing. Quote it.


ℹ️ Expand to see all @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Help us improve LIFT! (Sonatype LiftBot external survey)

Was this a good recommendation for you? Answering this survey will not impact your Lift settings.

[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant